In [1]:
from __future__ import absolute_import
from __future__ import print_function

from tornado import gen
from flowz import app
from flowz.channels import *

Introduction to Channels

Channel

A Channel gives a means of asynchronous iteration over items coming from some upstream source.

A consumer of a Channel uses its next() method to iteratively receive items as the channel makes them available; when the channel is exhausted, subsequent invocations of next() result in a ChannelDone exception. Those calls to next(), however, would need to be made in the context of a tornado coroutine, which gets complex pretty quickly.

In practice, channels are, instead, set up declaratively and passed to an app.Flo object to run exhaustively through them via the run() method. That behavior will generally be used in this guide via functions like this (though more complex in future chapters):


In [2]:
def print_chans(*chans):
    app.Flo([chan.map(print) for chan in chans]).run()

Also, channels are meant to refer to other channels to build up a graph of dependencies. flowz provides the ability to "tee" a channel to create a second independent iterator that enables safe dependencies. That feature is one of the things that makes flowz useful in ways that other options (e.g., Python 3 async iterators) are not.

The plain Channel class is effectivley abstract, with many useful implementations via subclasses. Many of the important subclasses will be covered in this guide.

IterChannel

An IterChannel converts an iterable into a channel. This is particularly helpful in this guide for illustrative purposes:


In [3]:
chan = IterChannel(range(5))
print_chans(chan)


0
1
2
3
4

Those innocuous lines did the following things:

  1. Wrapped a range iterator with an IterChannel.
  2. Wrapped that channel with a MapChannel (see below) that will print each element in the wrapped channel.
  3. Passed that MapChannel to an app.Flo object.
  4. Fully iterated (asychronously) over that channel.

For now, that seems like a lot a hullabaloo for iterating over five numbers, but the value will become clearer as time goes on.

TeeChannel [.tee()]

A TeeChannel wraps another channel to provide a way to independently iterate over the wrapped channel starting from the same point forwards. It doesn't create a copy of the channel or the objects in it; it just presents the same objects in the same order via an independent iterator.

In practice, you would never use the TeeChannel class directly; you would, instead, just call .tee() on an existing channel.


In [4]:
chan1 = IterChannel(range(5))
chan2 = chan1.tee()
print_chans(chan1)
print('----')
print_chans(chan2)


0
1
2
3
4
----
0
1
2
3
4

A slight modification of that example gives a first chance to demonstrate the asynchronous nature of the iteration.


In [5]:
chan1 = IterChannel(range(5))
chan2 = chan1.tee()
print_chans(chan1, chan2)


0
0
1
1
2
2
3
3
4
4

Note that the printing of the numbers interleaves between the two channels. That is a consequence of the logic running on a tornado loop with each channel yielding control back and forth to each other (and any other coroutines involved). That asynchrony becomes very important and useful as the channels get more complicated and involve accessing cloud-based storage and other sources that are best accessed asynchronously themselves.

It is important when constructing a graph of flowz channels to make sure that each channel and each tee of a channel is consumed by exactly one consumer. If two consumers iterate the same channel or tee, indeterminate behavior will result. And if no consumer iterates a channel or tee, it will cause all of the objects iterated in other tees of the same channel to be kept in memory until the unconsumed channel is garbage collected.

See the later chapter on channel managers for additional tools to make this easier.

MapChannel [.map(mapper)]

A MapChannel wraps another channel and applies a function to each of the elements in the underlying channel.


In [6]:
chan = MapChannel(IterChannel(range(5)), lambda x: x*2)
print_chans(chan)


0
2
4
6
8

The same logic can be performed on any channel with the helper method .map(mapper):


In [7]:
chan = IterChannel(range(5)).map(lambda x: x*2)
print_chans(chan)


0
2
4
6
8

Almost all of the useful Channel subclasses are accessible via helper methods on the Channel class, so the rest of this guide will generally demonstrate the helper methods only.

Indexing

An occasionally handy variant on mapping is to use the standard python indexing operator [] to perform the indexing operation on each element of the channel.


In [8]:
chan = IterChannel(({'first': 'John', 'last': 'Cleese'}, {'first': 'Eric', 'last': 'Idle'}))['last']
print_chans(chan)


Cleese
Idle

FlatMapChannel [.flat_map(mapper)]

A variant on the MapChannel is a FlatMapChannel. Its mapper can return an iterable, and the items will be emitted one by one by the channel. Note the difference in behavior here:


In [9]:
chan = IterChannel(range(5))
map_chan = chan.map(lambda x: [x]*x)
flat_map_chan = chan.tee().flat_map(lambda x: [x]*x)
print_chans(map_chan)
print('----')
print_chans(flat_map_chan)


[]
[1]
[2, 2]
[3, 3, 3]
[4, 4, 4, 4]
----
1
2
2
3
3
3
4
4
4
4

FilterChannel [.filter(predicate)]

A FilterChannel wraps another channel and applies a function to each of the elements in the underlying channel, passing through the element only if the function returns true.


In [10]:
chan = IterChannel(range(5)).filter(lambda x: x % 2 == 0)
print_chans(chan)


0
2
4

And since these examples are looking a lot like the standard map/filter examples in Python tutorials, we might as well string them together!


In [11]:
chan = IterChannel(range(5)).filter(lambda x: x % 2 == 0).map(lambda x: x*2)
print_chans(chan)


0
4
8

ZipChannel [.zip(*channels)]

A ZipChannel returns the items from multiple channels grouped together in a way akin to the built-in zip function. In the resulting ZipChannel, the items in all the channels specified will be zipped together on a per-item basis. The channel on which you're invoking zip will be the first, and items from the other channels will follow their order of specification in parameters.


In [12]:
chan1 = IterChannel(range(5))
chan2 = chan1.tee().map(lambda x: x * 2)
chan3 = chan1.tee().map(lambda x: x ** 2)
print_chans(chan1.zip(chan2, chan3))


(0, 0, 0)
(1, 2, 1)
(2, 4, 4)
(3, 6, 9)
(4, 8, 16)

ChainChannel [.chain(*channels)]

A ChainChannel simply chains together multiple channels into one channel, as though concatenating them.


In [13]:
print_chans(IterChannel(range(3)).chain(IterChannel(range(10,13)), IterChannel(range(100,103))))


0
1
2
10
11
12
100
101
102

ObserveChannel [.observe(observer)]

An ObserveChannel wraps another channel and passes along its items untouched, but also has the opportunity to run its observer function against them.


In [14]:
print_chans(IterChannel(range(3)).observe(lambda x: print('I saw %d' % x)))


I saw 0
0
I saw 1
1
I saw 2
2

GroupChannel [.groupby(key_func)]

A GroupChannel wraps another channel and organizes its items into groups (tuples) based on the key returned for each when the key_func is applied.


In [15]:
print_chans(IterChannel(range(20)).groupby(lambda x: x // 5))


(0, [0, 1, 2, 3, 4])
(1, [5, 6, 7, 8, 9])
(2, [10, 11, 12, 13, 14])
(3, [15, 16, 17, 18, 19])

For groupby() to work as expected, the channel must already be organized in the order (ascending or descending) of the desired group keys. Note how the alteration of the above example to group by "mod 5" rather than "div 5" is not pleasant:


In [16]:
print_chans(IterChannel(range(20)).groupby(lambda x: x % 5))


(0, [0])
(1, [1])
(2, [2])
(3, [3])
(4, [4])
(0, [5])
(1, [6])
(2, [7])
(3, [8])
(4, [9])
(0, [10])
(1, [11])
(2, [12])
(3, [13])
(4, [14])
(0, [15])
(1, [16])
(2, [17])
(3, [18])
(4, [19])

Rolling windows [.windowby(rolling(window_size))]

An alternate form of grouping that does not have a dedicated class, but instead relies on a helper function passed to the WindowChannel class (the superclass of GroupChannel), works as follows to produce rolling windows:


In [17]:
from flowz.channels import tools as chtools

In [18]:
print_chans(IterChannel(range(10)).windowby(chtools.rolling(5)))


(0, [0])
(1, [0, 1])
(2, [0, 1, 2])
(3, [0, 1, 2, 3])
(4, [0, 1, 2, 3, 4])
(5, [1, 2, 3, 4, 5])
(6, [2, 3, 4, 5, 6])
(7, [3, 4, 5, 6, 7])
(8, [4, 5, 6, 7, 8])
(9, [5, 6, 7, 8, 9])
(10, [6, 7, 8, 9])
(11, [7, 8, 9])
(12, [8, 9])
(13, [9])

Note that the objects are progressively added into rolling windows until they reach the target size, and then they taper off again at the very end. If you want to only deal with groups of the exact size, you can do this:


In [19]:
print_chans(IterChannel(range(10)).windowby(chtools.rolling(5)).filter(chtools.exact_group_size(5)))


(4, [0, 1, 2, 3, 4])
(5, [1, 2, 3, 4, 5])
(6, [2, 3, 4, 5, 6])
(7, [3, 4, 5, 6, 7])
(8, [4, 5, 6, 7, 8])
(9, [5, 6, 7, 8, 9])

You can also use pin_group_size(lower,upper) to choose groups with a range of sizes

That's it for basic channel operations. The remaining channels and operations are best understood after learning about artifacts.